Add low-latency raw memory search#173
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a low-latency raw search endpoint and enhances the existing search functionality with optional answer synthesis and latency tracking. Key changes include the implementation of search_raw and answer_from_sources in the retrieval pipeline, the addition of TTL-based caching for profile catalogs and retrieval plans, and the inclusion of detailed latency metrics in search responses. Feedback focuses on optimizing performance by parallelizing domain searches, ensuring the profile catalog retrieval is asynchronous to avoid blocking the event loop, and managing cache memory usage through bounded collections and hashed keys.
| if "profile" in domain_set: | ||
| results.extend(await self._search_profile_raw(query, user_id, top_k)) | ||
| if "temporal" in domain_set: | ||
| results.extend(await self._search_temporal(query, user_id, top_k)) | ||
| if "summary" in domain_set: | ||
| results.extend(await self._search_summary(query, user_id, top_k)) | ||
| if "snippet" in domain_set: | ||
| results.extend(await self._search_snippet(query, user_id, top_k)) |
There was a problem hiding this comment.
The current implementation of search_raw executes searches across different domains sequentially. To achieve true low-latency as intended by this PR, these searches should be executed in parallel using asyncio.gather.
| if "profile" in domain_set: | |
| results.extend(await self._search_profile_raw(query, user_id, top_k)) | |
| if "temporal" in domain_set: | |
| results.extend(await self._search_temporal(query, user_id, top_k)) | |
| if "summary" in domain_set: | |
| results.extend(await self._search_summary(query, user_id, top_k)) | |
| if "snippet" in domain_set: | |
| results.extend(await self._search_snippet(query, user_id, top_k)) | |
| domain_set = set(domains) | |
| tasks = [] | |
| if "profile" in domain_set: | |
| tasks.append(self._search_profile_raw(query, user_id, top_k)) | |
| if "temporal" in domain_set: | |
| tasks.append(self._search_temporal(query, user_id, top_k)) | |
| if "summary" in domain_set: | |
| tasks.append(self._search_summary(query, user_id, top_k)) | |
| if "snippet" in domain_set: | |
| tasks.append(self._search_snippet(query, user_id, top_k)) | |
| task_results = await asyncio.gather(*tasks) | |
| results: List[SourceRecord] = [item for sublist in task_results for item in sublist] |
There was a problem hiding this comment.
Resolved in the current head: search_raw now builds per-domain tasks and awaits asyncio.gather(*tasks, return_exceptions=True), so the raw path runs requested domain searches concurrently while preserving healthy domain results if one domain fails. The same concurrent path now includes the code domain as well.
| def _get_profile_catalog(self, user_id: str): | ||
| cached = self._profile_catalog_cache.get(user_id) | ||
| now = time.monotonic() | ||
| if cached and cached[0] > now: | ||
| return cached[1], cached[2] | ||
|
|
||
| catalog, results = self._fetch_profile_catalog(user_id) | ||
| self._profile_catalog_cache[user_id] = ( | ||
| now + _CACHE_TTL_SECONDS, | ||
| catalog, | ||
| results, | ||
| ) | ||
| return catalog, results |
There was a problem hiding this comment.
The _get_profile_catalog method is synchronous and performs network I/O via _fetch_profile_catalog (which calls vector_store.search_by_metadata). Calling this from an async context like run or search_raw will block the entire event loop, significantly impacting performance and defeating the purpose of a low-latency path. This should be made asynchronous.
| def _get_profile_catalog(self, user_id: str): | |
| cached = self._profile_catalog_cache.get(user_id) | |
| now = time.monotonic() | |
| if cached and cached[0] > now: | |
| return cached[1], cached[2] | |
| catalog, results = self._fetch_profile_catalog(user_id) | |
| self._profile_catalog_cache[user_id] = ( | |
| now + _CACHE_TTL_SECONDS, | |
| catalog, | |
| results, | |
| ) | |
| return catalog, results | |
| async def _get_profile_catalog(self, user_id: str): | |
| cached = self._profile_catalog_cache.get(user_id) | |
| now = time.monotonic() | |
| if cached and cached[0] > now: | |
| return cached[1], cached[2] | |
| # Assuming search_by_metadata is made async or wrapped in an executor | |
| catalog, results = await self._fetch_profile_catalog(user_id) | |
| self._profile_catalog_cache[user_id] = ( | |
| now + _CACHE_TTL_SECONDS, | |
| catalog, | |
| results, | |
| ) | |
| return catalog, results |
There was a problem hiding this comment.
Resolved in the current head: _get_profile_catalog is now async, and the blocking metadata fetch runs through asyncio.to_thread(self._fetch_profile_catalog, user_id), so the retrieval path no longer blocks the event loop while loading the profile catalog.
| self._profile_catalog_cache: Dict[str, tuple[float, List[Dict[str, str]], List[Any]]] = {} | ||
| self._retrieval_plan_cache: Dict[tuple[str, str, int, str], tuple[float, AIMessage]] = {} | ||
| self._latency_samples: Dict[str, List[float]] = {} |
There was a problem hiding this comment.
The caches _profile_catalog_cache and _retrieval_plan_cache are unbounded dictionaries that only expire entries on access. This can lead to a memory leak as entries for users who do not return will persist indefinitely. Consider using a cache with a maximum size and an eviction policy (e.g., cachetools.TTLCache).
There was a problem hiding this comment.
Resolved in the current head: _profile_catalog_cache and _retrieval_plan_cache are now OrderedDict caches with TTL checks plus explicit max-size trimming via _trim_cache. Latency samples are also bounded to avoid unbounded growth.
| ] | ||
|
|
||
| ai_response: AIMessage = await self.model_with_tools.ainvoke(messages) | ||
| plan_key = (user_id, query.strip(), top_k, catalog_text) |
There was a problem hiding this comment.
Using the entire catalog_text as part of the cache key for _retrieval_plan_cache can be memory-intensive if the catalog is large. Consider using a hash of the catalog_text instead.
import hashlib
catalog_hash = hashlib.sha256(catalog_text.encode()).hexdigest()
plan_key = (user_id, query.strip(), top_k, catalog_hash)There was a problem hiding this comment.
Resolved in the current head: the retrieval plan key now uses hashlib.sha256(catalog_text.encode(utf-8)).hexdigest() instead of storing the full catalog text in the cache key.
|
Follow-up after the latest push at that time: The earlier performance review items were addressed in the same branch: raw domain searches run concurrently, profile catalog lookup is async, and both caches are bounded with hashed plan keys. Local verification at that point was Update from the same 2026-05-11 head: GitHub Actions Test Suite was green, with Unit, API, and Integration Tests and End-to-End Tests passing on |
|
@strongkeep-debug thank you for your contribution pls review the gemini suggestions and resolve them pls make sure to also add comment on the suggestions :) |
|
@ved015 done. I replied directly on all four Gemini suggestions and verified that branch head covered them at the time: raw domain searches run concurrently with asyncio.gather, profile catalog loading is async via asyncio.to_thread, the profile/plan caches and latency samples are bounded, and the plan cache key uses a SHA-256 catalog hash. PR checks were green on that earlier head; the newer test-only follow-up comment below has the current CI state. |
|
Small follow-up pushed: Local validation on the current head: The PR label check passed on the new head. The Test Suite workflow is currently marked |
|
Since the hosted Test Suite is still The score-serialization concern is covered on this branch in both places that can emit scores: Current local validation: |
Ankit-Kotnala
left a comment
There was a problem hiding this comment.
Thanks for the PR. One blocker before merge: raw code search seems to query the default retrieval vector store, but code annotations are written to the annotations namespace via code_vector_store.
Can you update _search_code() to read from the same annotations namespace used by ingest/weaver, and add a regression test for that? After that this should be good to merge.
|
@Ankit-Kotnala addressed in Raw code search now reads from the code annotation vector store instead of the default memory store. In the production path, I added a regression test that seeds a stale Validation run locally: |
51c4150 to
fa5da96
Compare
|
Updated PR #173 on current This resolves the stale/dirty branch state while keeping the requested code-search fix: production Local validation after the rebase: Hosted |
|
@codex pls review this pr for any bugs report actual one only |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fa5da964be
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex pls review this pr and report actual bugs if any |
|
Codex Review: Didn't find any major issues. More of your lovely PRs please. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
|
CI-only follow-up pushed in What changed:
Local validation on the new head: The new hosted workflow runs are currently |
|
Update for reviewers: current head What changed since
Local validation on the current head: Hosted |
3bf43f7 to
ec8d3ed
Compare
|
Rebased this PR onto current The code-search blocker from the prior review remains addressed in the rebased head: raw code search reads from the annotations-backed Validation: Could you re-review the code-store blocker when you have a chance? |
d3f97f6 to
46affc9
Compare
This reverts commit a4cdba1.
46affc9 to
9aaf5f8
Compare
|
Re-reviewed the latest head. The code-store issue looks fixed now: raw code search is using the annotations/code vector store instead of the default memory store, and the regression test covers the stale default-store case. I also checked the raw search flow, answer mode, score normalization, concurrent domain search, and cache invalidation after ingest. The targeted route and retrieval tests pass locally for me. @ishaanxgupta Looks good from my side. |
|
| Filename | Overview |
|---|---|
| src/pipelines/retrieval.py | Core pipeline rewrite adding search_raw, answer_from_sources, LRU caches, latency sampling, and a dedicated code annotation store. Contains a redundant double score-normalisation pass in search_raw and a subtle code_vector_store fallback that routes to the wrong store when only vector_store is injected. |
| src/api/routes/memory.py | Adds search_router for the root /search alias, delegates to pipeline.search_raw, and adds answer synthesis and latency recording. The latency snapshot included in every response exposes cross-user aggregate telemetry and agentic-mode stats to all callers. |
| src/api/schemas.py | Extends SearchRequest with answer, snippet/code domains, and adds answer/model/confidence/latency to SearchResponse. Missing the strip_query validator that RetrieveRequest has. |
| src/api/app.py | Registers the new search_router (root /search alias) before memory_router; minor cleanup of unused variable in sentry-debug endpoint. |
| tests/integration/test_retrieval_pipeline.py | Comprehensive new tests covering caching, raw search, concurrency, score normalisation, and code store isolation. |
| tests/api/test_memory_search_routes.py | New API-layer tests covering raw hits, answer synthesis, code domain, and cache invalidation on ingest. |
Sequence Diagram
sequenceDiagram
participant C as Client
participant R as search_router(/search) or router(/v1/memory/search)
participant P as RetrievalPipeline
participant VS as VectorStore
participant CVS as CodeVectorStore
participant N as Neo4jClient
participant LLM as LLM
C->>R: "POST {query, domains, top_k, answer}"
R->>P: search_raw(query, user_id, domains, top_k)
par profile in domains
P->>VS: "search_by_text(filter=profile)"
and temporal in domains
P->>N: search_events_by_embedding(query)
and summary in domains
P->>VS: "search_by_text(filter=summary)"
and snippet in domains
P->>VS: "search_by_text(filter=snippet)"
and code in domains
P->>CVS: "search_by_text(filter=code)"
end
P-->>R: List[SourceRecord] ranked by score
alt "answer=true"
R->>P: answer_from_sources(query, sources)
P->>LLM: ainvoke(ANSWER_PROMPT + context)
LLM-->>P: answer text
P-->>R: str answer
end
R->>P: record_latency(mode, elapsed_ms)
R->>P: get_latency_snapshot()
P-->>R: "{raw/answer/agentic: {count, p50, p95, p99}}"
R-->>C: "APIResponse{results, total, answer, model, confidence, latency}"
Reviews (1): Last reviewed commit: "Invalidate profile cache after memory in..." | Re-trigger Greptile
| answer=answer, | ||
| model=_model_name(pipeline.model) if req.answer else "", | ||
| confidence=min(1.0, len(all_results) * 0.2) if answer else 0.0, | ||
| latency=pipeline.get_latency_snapshot(), |
There was a problem hiding this comment.
Cross-user latency telemetry in every response
get_latency_snapshot() returns the shared pipeline singleton's accumulated _latency_samples dict, which collects data from all users and all modes (raw, answer, and agentic). Every authenticated caller therefore receives a latency object that includes the count and percentiles of other users' requests — including from the /v1/memory/retrieve agentic endpoint that has nothing to do with search. The count value reveals how many recent requests have been processed system-wide, making this a side-channel that leaks activity patterns across the user base. This data belongs in the existing Prometheus /metrics endpoint, not in a per-call user response.
| @field_validator("domains") | ||
| @classmethod | ||
| def validate_domains(cls, v: List[str]) -> List[str]: |
There was a problem hiding this comment.
Missing
strip_query validator on SearchRequest
RetrieveRequest strips leading/trailing whitespace from query via a field_validator, but SearchRequest does not. The agentic pipeline's plan cache already normalises with query.strip() on its own, so a search for "latency " and "latency" would hit the same cache entry but be stored as different raw keys. Adding the validator keeps the two request models consistent.
| @field_validator("domains") | |
| @classmethod | |
| def validate_domains(cls, v: List[str]) -> List[str]: | |
| @field_validator("query") | |
| @classmethod | |
| def strip_query(cls, v: str) -> str: | |
| return v.strip() | |
| @field_validator("domains") | |
| @classmethod | |
| def validate_domains(cls, v: List[str]) -> List[str]: |
| if code_vector_store is not None: | ||
| self.code_vector_store = code_vector_store | ||
| elif vector_store is None: | ||
| self.code_vector_store = get_vector_store( | ||
| namespace=annotations_namespace(org_id), | ||
| create_if_not_exists=False, | ||
| ) | ||
| else: | ||
| self.code_vector_store = self.vector_store |
There was a problem hiding this comment.
code_vector_store falls back to the main store when a custom vector_store is passed
The condition elif vector_store is None means: when a caller supplies an explicit vector_store (but no code_vector_store), code annotations are read from that same general-purpose store rather than the org-scoped annotations namespace. In production this path is likely never hit (both args are None), but any integration test that passes only vector_store= will silently route code-domain queries to the wrong namespace and miss real annotation data. Consider changing the condition to elif code_vector_store is None or at least documenting the fallback intent clearly.
| results = [ | ||
| record | ||
| for domain_results in task_results | ||
| if not self._log_search_error(domain_results) | ||
| for record in domain_results | ||
| ] | ||
| for record in results: | ||
| record.score = self._score_value(record.score) | ||
|
|
||
| return sorted(results, key=lambda record: record.score, reverse=True) |
There was a problem hiding this comment.
Double application of
_score_value in search_raw
Every per-domain search helper (_search_profile_raw, _search_temporal, _search_summary, _search_snippet, _search_code) already calls self._score_value(r.score) when constructing each SourceRecord. The loop at the end of search_raw then calls _score_value a second time on every record. While idempotent for well-formed floats, it is redundant work and could confuse readers into thinking the inner helpers do not normalise scores. The second pass can be removed.
Addresses #163.
This PR turns memory search into a true low-latency path. Raw search now goes through
RetrievalPipeline.search_rawand returns ranked profile, temporal, summary, snippet, and code annotation hits without retrieval-plan tool selection.answer=truesynthesizes from those already-fetched hits when a caller wants a generated answer, and the root/searchalias is wired for clients that need the shorter path.test_raw_search_returns_ranked_hits_without_tool_selectionconfirms no tool-selection call is made and verifies a code hit keeps file and symbol metadata./v1/memory/searchacceptscodein the domain list and includes it in the default raw search domain set.test_memory_search_route_accepts_code_domaincovers the request validator and serialized response shape.answer=truesynthesizes from collected raw hits without doing agentic retrieval planning first.test_root_search_alias_can_synthesize_answercovers the alias and answer mode.test_retrieval_pipeline_caches_catalog_and_retrieval_plancovers cache reuse.test_raw_search_skips_failed_domains_and_normalizes_scoresand the API route regression cover both pipeline and serialization behavior.Validation was run locally: